{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# 1. Pyspark cluster Demo"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "from pyspark import SparkConf, SparkContext\n",
    "## set up spark context\n",
    "from pyspark.sql import SQLContext\n",
    "sqlContext = SQLContext(sc)\n",
    "## set up  SparkSession\n",
    "from pyspark.sql import SparkSession\n",
    "\n",
    "spark = SparkSession \\\n",
    "    .builder \\\n",
    "    .appName(\"Python Spark SQL basic example\") \\\n",
    "    .config(\"spark.some.config.option\", \"some-value\") \\\n",
    "    .getOrCreate()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "from pyspark.ml.clustering import KMeans\n",
    "from pyspark.ml.linalg import Vectors #for above 2.0 version\n",
    "#from pyspark.mllib.linalg import Vectors "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "- Creat dataset"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---------+\n",
      "| features|\n",
      "+---------+\n",
      "|[0.0,0.0]|\n",
      "|[1.0,1.0]|\n",
      "|[9.0,8.0]|\n",
      "|[8.0,9.0]|\n",
      "+---------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "# load data\n",
    "data = [(Vectors.dense([0.0, 0.0]),), \\\n",
    "        (Vectors.dense([1.0, 1.0]),),\\\n",
    "        (Vectors.dense([9.0, 8.0]),), \\\n",
    "        (Vectors.dense([8.0, 9.0]),)]\n",
    "df = sqlContext.createDataFrame(data, [\"features\"])\n",
    "df.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "2"
      ]
     },
     "execution_count": 17,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "#kmeans model\n",
    "kmeans = KMeans(k=2, seed=1)\n",
    "Kmodel = kmeans.fit(df)\n",
    "# number of cenet\n",
    "centers = Kmodel.clusterCenters()\n",
    "len(centers)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[Row(features=DenseVector([0.0, 0.0]), prediction=1),\n",
       " Row(features=DenseVector([1.0, 1.0]), prediction=1),\n",
       " Row(features=DenseVector([9.0, 8.0]), prediction=0),\n",
       " Row(features=DenseVector([8.0, 9.0]), prediction=0)]"
      ]
     },
     "execution_count": 18,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "transformed = Kmodel.transform(df).select(\"features\", \"prediction\")\n",
    "rows = transformed.collect()\n",
    "rows"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "True"
      ]
     },
     "execution_count": 19,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "rows[0].prediction == rows[1].prediction\n",
    "#True\n",
    "rows[2].prediction == rows[3].prediction\n",
    "# True"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# 2. Pyspark cluster for iris dataset"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 20,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "from pyspark.ml.clustering import KMeans\n",
    "from pyspark.sql import SQLContext\n",
    "from pyspark.ml.linalg import Vectors"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 21,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "from pyspark import SparkContext\n",
    "#sc = SparkContext()\n",
    "sqlContext = SQLContext(sc) "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "- Load data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 22,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "data = sqlContext.read.format('com.databricks.spark.csv').\\\n",
    "                               options(header='true', \\\n",
    "                               inferschema='true').load('./data/iris.csv')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 23,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- sepal_length: double (nullable = true)\n",
      " |-- sepal_width: double (nullable = true)\n",
      " |-- petal_length: double (nullable = true)\n",
      " |-- petal_width: double (nullable = true)\n",
      " |-- species: string (nullable = true)\n",
      "\n",
      "+------------+-----------+------------+-----------+-------+\n",
      "|sepal_length|sepal_width|petal_length|petal_width|species|\n",
      "+------------+-----------+------------+-----------+-------+\n",
      "|         5.1|        3.5|         1.4|        0.2| setosa|\n",
      "|         4.9|        3.0|         1.4|        0.2| setosa|\n",
      "|         4.7|        3.2|         1.3|        0.2| setosa|\n",
      "|         4.6|        3.1|         1.5|        0.2| setosa|\n",
      "|         5.0|        3.6|         1.4|        0.2| setosa|\n",
      "|         5.4|        3.9|         1.7|        0.4| setosa|\n",
      "|         4.6|        3.4|         1.4|        0.3| setosa|\n",
      "|         5.0|        3.4|         1.5|        0.2| setosa|\n",
      "|         4.4|        2.9|         1.4|        0.2| setosa|\n",
      "|         4.9|        3.1|         1.5|        0.1| setosa|\n",
      "|         5.4|        3.7|         1.5|        0.2| setosa|\n",
      "|         4.8|        3.4|         1.6|        0.2| setosa|\n",
      "|         4.8|        3.0|         1.4|        0.1| setosa|\n",
      "|         4.3|        3.0|         1.1|        0.1| setosa|\n",
      "|         5.8|        4.0|         1.2|        0.2| setosa|\n",
      "|         5.7|        4.4|         1.5|        0.4| setosa|\n",
      "|         5.4|        3.9|         1.3|        0.4| setosa|\n",
      "|         5.1|        3.5|         1.4|        0.3| setosa|\n",
      "|         5.7|        3.8|         1.7|        0.3| setosa|\n",
      "|         5.1|        3.8|         1.5|        0.3| setosa|\n",
      "+------------+-----------+------------+-----------+-------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "data.printSchema()\n",
    "data.show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "- Convert string data to numerical data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 24,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "from pyspark.ml.feature import StringIndexer\n",
    "feature = StringIndexer(inputCol=\"species\", outputCol=\"targetlabel\")\n",
    "target = feature.fit(data).transform(data)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 25,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- sepal_length: double (nullable = true)\n",
      " |-- sepal_width: double (nullable = true)\n",
      " |-- petal_length: double (nullable = true)\n",
      " |-- petal_width: double (nullable = true)\n",
      " |-- species: string (nullable = true)\n",
      " |-- targetlabel: double (nullable = true)\n",
      "\n",
      "+------------+-----------+------------+-----------+-------+-----------+\n",
      "|sepal_length|sepal_width|petal_length|petal_width|species|targetlabel|\n",
      "+------------+-----------+------------+-----------+-------+-----------+\n",
      "|         5.1|        3.5|         1.4|        0.2| setosa|        2.0|\n",
      "|         4.9|        3.0|         1.4|        0.2| setosa|        2.0|\n",
      "|         4.7|        3.2|         1.3|        0.2| setosa|        2.0|\n",
      "|         4.6|        3.1|         1.5|        0.2| setosa|        2.0|\n",
      "|         5.0|        3.6|         1.4|        0.2| setosa|        2.0|\n",
      "|         5.4|        3.9|         1.7|        0.4| setosa|        2.0|\n",
      "|         4.6|        3.4|         1.4|        0.3| setosa|        2.0|\n",
      "|         5.0|        3.4|         1.5|        0.2| setosa|        2.0|\n",
      "|         4.4|        2.9|         1.4|        0.2| setosa|        2.0|\n",
      "|         4.9|        3.1|         1.5|        0.1| setosa|        2.0|\n",
      "|         5.4|        3.7|         1.5|        0.2| setosa|        2.0|\n",
      "|         4.8|        3.4|         1.6|        0.2| setosa|        2.0|\n",
      "|         4.8|        3.0|         1.4|        0.1| setosa|        2.0|\n",
      "|         4.3|        3.0|         1.1|        0.1| setosa|        2.0|\n",
      "|         5.8|        4.0|         1.2|        0.2| setosa|        2.0|\n",
      "|         5.7|        4.4|         1.5|        0.4| setosa|        2.0|\n",
      "|         5.4|        3.9|         1.3|        0.4| setosa|        2.0|\n",
      "|         5.1|        3.5|         1.4|        0.3| setosa|        2.0|\n",
      "|         5.7|        3.8|         1.7|        0.3| setosa|        2.0|\n",
      "|         5.1|        3.8|         1.5|        0.3| setosa|        2.0|\n",
      "+------------+-----------+------------+-----------+-------+-----------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "target.printSchema()\n",
    "target.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 26,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "from pyspark.sql import Row\n",
    "from pyspark.ml.clustering import KMeans\n",
    "from pyspark.ml.linalg import Vectors"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 27,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# convert the data to dense vector\n",
    "def transData(row):\n",
    "    return Row(label=row[\"targetlabel\"],\n",
    "               features=Vectors.dense([row[\"sepal_length\"],\n",
    "                                       row[\"sepal_width\"],\n",
    "                                       row[\"petal_length\"],\n",
    "                                       row[\"petal_width\"]]))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 28,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-----------------+-----+\n",
      "|         features|label|\n",
      "+-----------------+-----+\n",
      "|[5.1,3.5,1.4,0.2]|  2.0|\n",
      "|[4.9,3.0,1.4,0.2]|  2.0|\n",
      "|[4.7,3.2,1.3,0.2]|  2.0|\n",
      "|[4.6,3.1,1.5,0.2]|  2.0|\n",
      "|[5.0,3.6,1.4,0.2]|  2.0|\n",
      "|[5.4,3.9,1.7,0.4]|  2.0|\n",
      "|[4.6,3.4,1.4,0.3]|  2.0|\n",
      "|[5.0,3.4,1.5,0.2]|  2.0|\n",
      "|[4.4,2.9,1.4,0.2]|  2.0|\n",
      "|[4.9,3.1,1.5,0.1]|  2.0|\n",
      "|[5.4,3.7,1.5,0.2]|  2.0|\n",
      "|[4.8,3.4,1.6,0.2]|  2.0|\n",
      "|[4.8,3.0,1.4,0.1]|  2.0|\n",
      "|[4.3,3.0,1.1,0.1]|  2.0|\n",
      "|[5.8,4.0,1.2,0.2]|  2.0|\n",
      "|[5.7,4.4,1.5,0.4]|  2.0|\n",
      "|[5.4,3.9,1.3,0.4]|  2.0|\n",
      "|[5.1,3.5,1.4,0.3]|  2.0|\n",
      "|[5.7,3.8,1.7,0.3]|  2.0|\n",
      "|[5.1,3.8,1.5,0.3]|  2.0|\n",
      "+-----------------+-----+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "#convert the data to Dataframe\n",
    "#Note: if your pyspark is 2.0 above, you need to convert Datafrme to rdd FIRST\n",
    "transformed = target.rdd.map(transData).toDF() \n",
    "transformed.show()"
   ]
  },
  {
   "cell_type": "raw",
   "metadata": {},
   "source": []
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Fit Kmeans model"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 29,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-----------------+-----+----------+\n",
      "|         features|label|prediction|\n",
      "+-----------------+-----+----------+\n",
      "|[5.1,3.5,1.4,0.2]|  2.0|         0|\n",
      "|[4.9,3.0,1.4,0.2]|  2.0|         0|\n",
      "|[4.7,3.2,1.3,0.2]|  2.0|         0|\n",
      "|[4.6,3.1,1.5,0.2]|  2.0|         0|\n",
      "|[5.0,3.6,1.4,0.2]|  2.0|         0|\n",
      "|[5.4,3.9,1.7,0.4]|  2.0|         0|\n",
      "|[4.6,3.4,1.4,0.3]|  2.0|         0|\n",
      "|[5.0,3.4,1.5,0.2]|  2.0|         0|\n",
      "|[4.4,2.9,1.4,0.2]|  2.0|         0|\n",
      "|[4.9,3.1,1.5,0.1]|  2.0|         0|\n",
      "|[5.4,3.7,1.5,0.2]|  2.0|         0|\n",
      "|[4.8,3.4,1.6,0.2]|  2.0|         0|\n",
      "|[4.8,3.0,1.4,0.1]|  2.0|         0|\n",
      "|[4.3,3.0,1.1,0.1]|  2.0|         0|\n",
      "|[5.8,4.0,1.2,0.2]|  2.0|         0|\n",
      "|[5.7,4.4,1.5,0.4]|  2.0|         0|\n",
      "|[5.4,3.9,1.3,0.4]|  2.0|         0|\n",
      "|[5.1,3.5,1.4,0.3]|  2.0|         0|\n",
      "|[5.7,3.8,1.7,0.3]|  2.0|         0|\n",
      "|[5.1,3.8,1.5,0.3]|  2.0|         0|\n",
      "+-----------------+-----+----------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "# model and predict data\n",
    "kmeans = KMeans(k=3)\n",
    "model = kmeans.fit(transformed) \n",
    "predict_data = model.transform(transformed)\n",
    "predict_data.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 30,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "train_err = predict_data.filter(predict_data['label'] != predict_data['prediction']).count() \n",
    "total = predict_data.count()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 31,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "(136, 150, 0.09333333333333338, 0.9066666666666666)"
      ]
     },
     "execution_count": 31,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "train_err, total, 1-float(train_err)/total,float(train_err)/total"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "anaconda-cloud": {},
  "kernelspec": {
   "display_name": "Python [conda root]",
   "language": "python",
   "name": "conda-root-py"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.5.2"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 1
}
